У меня есть одна проблема, одна из многих.
Не могу понять, что там не так в моем <стреаминг>, когда создаешь <дирецт стреам>, то ети ихние <рдд> все-таки не такие, как обычные. Помогите, люди добрые, а то спать не пойду седня без решения
Внизу кусок кода, процедура <process> получает значение, я могу его распечать. Подобная, 1 в 1, программа работает без проблем
<
def process(msgs):
if not msgs.isEmpty():
msg_df = ss.createDataFrame(msgs, schema=["pid","src","tname","tunixtimestamp","tvalue","str_pdate"])
msg_df.show(5)
msg_to_write = msg_df.select("pid","src","tname","tunixtimestamp","tvalue",(msg_df.src).alias('pSource'),(msg_df.str_pdate).alias('pdate'))
#t1 = time()
msg_to_write.write.mode("append").partitionBy('pSource','pdate').saveAsTable('dbt.t_consumer')
#t2 = time()
#print( "Time to load = %0.f seconds" % (t2-t1))
if __name__ == "__main__":
ss = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = ss.sparkContext
ssc = StreamingContext(sc, 3)
kafkaParams = {
"group.id": '',
"key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"enable.auto.commit": "false"
}
topic = ["/user/t/mystream:mytopic"]
consumerStrategy = ConsumerStrategies.Subscribe(sc, topic, kafkaParams)
locationStrategy = LocationStrategies.PreferConsistent(sc)
messagesDS = KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy)
#messagesDS.pprint()
msg_rec = messagesDS.map(lambda x: x[1])
j_msg = msg_rec.map(lambda r: json.loads(r))
#j_msg.pprint()
j_msg.foreachRDD(process)
ssc.start()
ssc.awaitTermination(30)
ssc.stop()
17/09/09 15:38:20 WARN StreamingContext: Dynamic Allocation is enabled for this application. Enabling Dynamic allocation for Spark Streaming applications can cause data loss if Write Ahead Log is not enabled for non-replayable sources like Flume. See the programming guide for details on how to enable the Write Ahead Log.
process called, not empty
17/09/09 15:38:28 ERROR JobScheduler: Error running job streaming job 1504989505000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/opt/mapr/spark/spark-2.1.0/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call
r = self.func(t, *rdds)
File "/opt/mapr/spark/spark-2.1.0/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in <lambda>
func = lambda t, rdd: old_func(rdd)
File "/mapr/maprdev/user/t/consumer_3.py", line 27, in process
msg_df = ss.createDataFrame(msgs, schema=["pid","src","tname","tunixtimestamp","tvalue","str_pdate"])
File "/opt/mapr/spark/spark-2.1.0/python/lib/pyspark.zip/pyspark/sql/session.py", line 520, in createDataFrame
rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "/opt/mapr/spark/spark-2.1.0/python/lib/pyspark.zip/pyspark/sql/session.py", line 360, in _createFromRDD
struct = self._inferSchema(rdd, samplingRatio)
File "/opt/mapr/spark/spark-2.1.0/python/lib/pyspark.zip/pyspark/sql/session.py", line 331, in _inferSchema
first = rdd.first()
File "/opt/mapr/spark/spark-2.1.0/python/lib/pyspark.zip/pyspark/rdd.py", line 1364, in first
raise ValueError("RDD is empty")
ValueError: RDD is empty>
Pyspark streaming RDD problem
-
- Уже с Приветом
- Posts: 12065
- Joined: 15 Feb 2002 10:01
- Location: TX